Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MSQ profile for Brokers and Historicals. #17140

Merged
merged 30 commits into from
Oct 1, 2024
Merged

MSQ profile for Brokers and Historicals. #17140

merged 30 commits into from
Oct 1, 2024

Conversation

gianm
Copy link
Contributor

@gianm gianm commented Sep 24, 2024

This patch adds a profile of MSQ named "Dart" that runs on Brokers and Historicals, and which is compatible with the standard SQL query API. For more high-level description, and notes on future work, refer to #17139.

This patch contains the following changes, grouped into packages.

Controller (org.apache.druid.msq.dart.controller):

The controller runs on Brokers. Main classes are,

  • DartSqlResource, which serves /druid/v2/sql/dart/.
  • DartSqlEngine and DartQueryMaker, the entry points from SQL that actually run the MSQ controller code.
  • DartControllerContext, which configures the MSQ controller.
  • DartMessageRelays, which sets up relays (see "message relays" below) to read messages from workers' DartControllerClients.
  • DartTableInputSpecSlicer, which assigns work based on a TimelineServerView.

Worker (org.apache.druid.msq.dart.worker)

The worker runs on Historicals. Main classes are,

  • DartWorkerResource, which supplies the regular MSQ WorkerResource, plus Dart-specific APIs.
  • DartWorkerRunner, which runs MSQ worker code.
  • DartWorkerContext, which configures the MSQ worker.
  • DartProcessingBuffersProvider, which provides processing buffers from sliced-up merge buffers.
  • DartDataSegmentProvider, which provides segments from the Historical's local cache.

Message relays (org.apache.druid.messages):

To avoid the need for Historicals to contact Brokers during a query, which would create opportunities for queries to get stuck, all connections are opened from Broker to Historical. This is made possible by a message relay system, where the relay server (worker) has an outbox of messages.

The relay client (controller) connects to the outbox and retrieves messages. Code for this system lives in the "server" package to keep it separate from the MSQ extension and make it easier to maintain. The worker-to-controller ControllerClient is implemented using message relays.

Other changes:

  • Controller: Added the method "hasWorker". Used by the ControllerMessageListener to notify the appropriate controllers when a worker fails.
  • WorkerResource: No longer tries to respond more than once in the "httpGetChannelData" API. This comes up when a response due to resolved future is ready at about the same time as a timeout occurs.
  • MSQTaskQueryMaker: Refactor to separate out some useful functions for reuse in DartQueryMaker.
  • SqlEngine: Add "queryContext" to "resultTypeForSelect" and "resultTypeForInsert". This allows the DartSqlEngine to modify result format based on whether a "fullReport" context parameter is set.
  • LimitedOutputStream: New utility class. Used when in "fullReport" mode.
  • TimelineServerView: Add getDruidServerMetadata as a performance optimization.
  • CliHistorical: Add SegmentWrangler, so it can query inline data, lookups, etc.
  • ServiceLocation: Add "fromUri" method, relocating some code from ServiceClientImpl.
  • FixedServiceLocator: New locator for a fixed set of service locations. Useful for URI locations.

This patch adds a profile of MSQ named "Dart" that runs on Brokers and
Historicals, and which is compatible with the standard SQL query API.
For more high-level description, and notes on future work, refer to apache#17139.

This patch contains the following changes, grouped into packages.

Controller (org.apache.druid.msq.dart.controller):

The controller runs on Brokers. Main classes are,

- DartSqlResource, which serves /druid/v2/sql/dart/.
- DartSqlEngine and DartQueryMaker, the entry points from SQL that actually
  run the MSQ controller code.
- DartControllerContext, which configures the MSQ controller.
- DartMessageRelays, which sets up relays (see "message relays" below) to read
  messages from workers' DartControllerClients.
- DartTableInputSpecSlicer, which assigns work based on a TimelineServerView.

Worker (org.apache.druid.msq.dart.worker)

The worker runs on Historicals. Main classes are,

- DartWorkerResource, which supplies the regular MSQ WorkerResource, plus
  Dart-specific APIs.
- DartWorkerRunner, which runs MSQ worker code.
- DartWorkerContext, which configures the MSQ worker.
- DartProcessingBuffersProvider, which provides processing buffers from
  sliced-up merge buffers.
- DartDataSegmentProvider, which provides segments from the Historical's
  local cache.

Message relays (org.apache.druid.messages):

To avoid the need for Historicals to contact Brokers during a query, which
would create opportunities for queries to get stuck, all connections are
opened from Broker to Historical. This is made possible by a message relay
system, where the relay server (worker) has an outbox of messages.

The relay client (controller) connects to the outbox and retrieves messages.
Code for this system lives in the "server" package to keep it separate from
the MSQ extension and make it easier to maintain. The worker-to-controller
ControllerClient is implemented using message relays.

Other changes:

- Controller: Added the method "hasWorker". Used by the ControllerMessageListener
  to notify the appropriate controllers when a worker fails.
- WorkerResource: No longer tries to respond more than once in the
  "httpGetChannelData" API. This comes up when a response due to resolved future
  is ready at about the same time as a timeout occurs.
- MSQTaskQueryMaker: Refactor to separate out some useful functions for reuse
  in DartQueryMaker.
- SqlEngine: Add "queryContext" to "resultTypeForSelect" and "resultTypeForInsert".
  This allows the DartSqlEngine to modify result format based on whether a "fullReport"
  context parameter is set.
- LimitedOutputStream: New utility class. Used when in "fullReport" mode.
- TimelineServerView: Add getDruidServerMetadata as a performance optimization.
- CliHistorical: Add SegmentWrangler, so it can query inline data, lookups, etc.
- ServiceLocation: Add "fromUri" method, relocating some code from ServiceClientImpl.
- FixedServiceLocator: New locator for a fixed set of service locations. Useful for
  URI locations.
@github-actions github-actions bot added Area - Batch Ingestion Area - Querying Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Sep 24, 2024
@gianm
Copy link
Contributor Author

gianm commented Sep 24, 2024

Marked draft since more test coverage is needed, although the main code is reviewable.

Please refer to #17139 for a high-level description of the proposed change.

RelDataType resultTypeForInsert(RelDataTypeFactory typeFactory, RelDataType validatedRowType);
RelDataType resultTypeForInsert(
RelDataTypeFactory typeFactory,
RelDataType validatedRowType,

Check notice

Code scanning / CodeQL

Useless parameter Note

The parameter 'validatedRowType' is never used.
RelDataType resultTypeForInsert(
RelDataTypeFactory typeFactory,
RelDataType validatedRowType,
Map<String, Object> queryContext

Check notice

Code scanning / CodeQL

Useless parameter Note

The parameter 'queryContext' is never used.
@cryptoe cryptoe added this to the 31.0.0 milestone Sep 25, 2024
@GET
@Produces(MediaType.APPLICATION_JSON)
public GetQueriesResponse doGetRunningQueries(
@QueryParam("selfOnly") final String selfOnly,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: boolean might be more appropriate perhaps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with String since that way the API can be /druid/v2/sql/dart/?selfOnly (in this case a string would be set to "", so we can check for non-null string). There might be a cleaner way to do this, I am not sure, but this approach is used elsewhere in the codebase. If you know a better way please LMK.

public LimitedOutputStream(OutputStream out, long limit, Function<Long, String> exceptionMessageFn)
{
this.out = out;
this.limit = limit;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can add a precondition for limit being >= 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

Comment on lines +81 to +84
public String getDartQueryId()
{
return dartQueryId;
}
Copy link
Contributor

@LakshSingla LakshSingla Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it is better to re-use the QueryResourceId that was added as a part of the merge-buffer allocation code instead of creating a dart query id. Both look semantically equivalent to me - a unique id that indicates the resources that it takes. Or is it better to have them separately, just in case we need them to diverge later?
In any case, should we have a separate class that explains the usages of the dart query id, why it differs and who populates it in a single place?

Copy link
Contributor Author

@gianm gianm Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, you're right that it's kind of the same idea. The downside of using QueryResourceId is that it has a javadoc explaining in detail how it's set for native queries, and adding Dart stuff to that existing javadoc might make it confusing.

The dartQueryId context parameter is simpler than the QueryResourceId as well, it's only set in one place (DartSqlResource) and only read in one place (DartQueryMaker) rather than potentially being set and read in multiple places. It might make sense to have a class to wrap this, similar to QueryResourceId, although I am not sure if it's worth it given the simpler nature.

For now I added this javadoc to DartSqlEngine#CTX_DART_QUERY_ID:

  /**
   * Dart queryId must be globally unique, so we cannot use the user-provided {@link QueryContexts#CTX_SQL_QUERY_ID}
   * or {@link BaseQuery#QUERY_ID}. Instead we generate a UUID in {@link DartSqlResource#doPost}, overriding whatever
   * the user may have provided. This becomes the {@link Controller#queryId()}.
   *
   * The user-provided {@link QueryContexts#CTX_SQL_QUERY_ID} is still registered with the {@link SqlLifecycleManager}
   * for purposes of query cancellation.
   *
   * The user-provided {@link BaseQuery#QUERY_ID} is ignored.
   */
  public static final String CTX_DART_QUERY_ID = "dartQueryId";

final DartWorkerClient workerClient
)
{
this.workerIds = workerIds;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instances of this class seem to be accessed by multiple threads. Given that the worker ids is passed to it through the caller, we should probably defensively wrap them in a synchronized collection to prevent any accidental mutations from the callers of getWorkerIds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok, because the workerIds list is not modified throughout the lifecycle of the query.

Comment on lines +34 to +35
* Add a controller. Throws {@link DruidException} if a controller with the same {@link Controller#queryId()} is
* already registered.
Copy link
Contributor

@LakshSingla LakshSingla Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the dartQueryId in place, should this be indexed based on that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is indexed by that currently, since the queryId() on the controller is set to the dartQueryId. I'll add a comment to the javadoc for Controller#queryId() clarifying that.

/**
* Remove a controller from the registry.
*/
void remove(ControllerHolder holder);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it sufficient to send the query id argument instead of the holder, as done with get(queryId)?

Suggested change
void remove(ControllerHolder holder);
void remove(String id);

Copy link
Contributor Author

@gianm gianm Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's like this mostly because it's simpler for the calling code to have remove accept the entire holder. The calling code is doing something like:

registry.register(holder);
// ... do stuff ...
registry.remove(holder);

So there is symmetry between register and remove.


@Provides
@LazySingleton
public ProcessingBuffersProvider createProcessingBuffersProvider(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be bound to @Dart?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't necessary right now, but might be useful later so I did add the annotation.

Copy link
Contributor

@abhishekagarwal87 abhishekagarwal87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments here and there.

@Override
public List<ResourceAction> getQueryPermissions(String queryId)
{
return getAdminPermissions();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so only an admin user can run dart queries right now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just for the "internal" APIs like the controller->worker APIs. End users would go through the DartSqlResource, which doesn't use this class. I will add comments to clarify.

Copy link
Contributor Author

@gianm gianm Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be very clear, the permission model for Dart queries is the same as regular SQL queries. If Dart is enabled then any regular user can issue queries against the tables that they have permissions for.

workerIds.add(WorkerId.fromDruidServerMetadata(server, queryId).toString());
}

// Shuffle workerIds, so we don't bias a single server to always be worker 0 (which tends to do more work).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is worker 0 doing more work? because its the non-leaf?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be the leaf as well as non leaf. Like you would always have atleast one worker in each stage and that would be worker 0 so shuffling serverId's helps us not load one server I think.

Copy link
Contributor Author

@gianm gianm Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expanded the comment to say why:

Shuffle workerIds, so we don't bias towards specific servers when running multiple queries concurrently. For any given query, lower-numbered workers tend to do more work, because the controller prefers using lower-numbered workers when maxWorkerCount for a stage is less than the total number of workers.

/**
* Registry for actively-running {@link Controller}.
*/
public interface DartControllerRegistry
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a need for this interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I guess not. I split it because I thought it was going to be useful in testing, but it didn't turn out to be. I'll collapse them.


// Block until messages are acknowledged.
try {
FutureUtils.getUnchecked(Futures.successfulAsList(futures), false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should have some defensive timeout here to avoid blocking forever

Comment on lines 250 to 256
if (dartQueryId instanceof String) {
final ControllerHolder holder = controllerRegistry.get((String) dartQueryId);
if (holder != null) {
found = true;
holder.cancel();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we log when the dartQueryId is not of a string type or is null

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice for this logic to sit inside the cancellable itself but that would require implementing a DartStatement I suppose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we log when the dartQueryId is not of a string type or is null

Yes, added.

It would be nice for this logic to sit inside the cancellable itself but that would require implementing a DartStatement I suppose.

Yeah it would be a new type of statement. But the way the statement stuff is designed, it isn't easily extensible, so it would be a bigger project to add new statement types. Even the SqlStatementResource (MSQ async query resource) uses HttpStatement.

reportFuture = controllerExecutor.submit(() -> {
try {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final TaskReportQueryListener queryListener = new TaskReportQueryListener(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should these classes (TaskReportQueryListener) be renamed since now they are used outside tasks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. It's still writing a thing called a MSQTaskReport that extends TaskReport. So I think the name makes sense. However, it might make sense to define some common object that is used by the MSQTaskReport container and the Dart API. If it's ok I'd like to leave this thought for the future.

@Override
public void validateContext(Map<String, Object> queryContext)
{
SqlEngines.validateNoSpecialContextKeys(queryContext, MSQTaskSqlEngine.SYSTEM_CONTEXT_PARAMETERS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this class should have its own copy of MSQTaskSqlEngine.SYSTEM_CONTEXT_PARAMETERS or maybe add a comment in the MSQTaskSqlEngine that these context parameters are disallowed for dart engine too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's good to have a shared static, so when keys are added we don't forget to add them in both places. So, I added a javadoc comment to MSQTaskSqlEngine.SYSTEM_CONTEXT_PARAMETERS mentioning it's used in Dart also.

final List<ProcessingBuffers> pool = new ArrayList<>(poolSize);

for (int i = 0; i < poolSize; i++) {
final int sliceSize = buffer.capacity() / poolSize / processingThreads;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be moved out of this loop

Comment on lines +113 to +116
return getClient(workerId).asyncRequest(
new RequestBuilder(HttpMethod.POST, "/stop"),
IgnoreHttpResponseHandler.INSTANCE
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we remove the worker from the map?

/**
* Relays run on clients, and receive messages from a server.
* Uses {@link MessageRelayClient} to communicate with the {@link MessageRelayResource} on a server.
* that flows upstream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it incomplete?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it's just erroneous text. I deleted it.

@abhishekagarwal87
Copy link
Contributor

In future, we should look into merging message relays with HTTP sync framework that is used by HTTP based segment and task management.

@kfaraz
Copy link
Contributor

kfaraz commented Sep 30, 2024

In future, we should look into merging message relays with HTTP sync framework that is used by HTTP based segment and task management.

Could you elaborate? Do you mean use the same base client object like ServiceClient?

{
// Fail workers when they're added, because when they're added, they shouldn't be running anything. If they are
// running something, cancel it.
workerFailed(node);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to generate the right error message if the server is added

Copy link
Contributor Author

@gianm gianm Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I just removed this, I think it isn't needed anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful to have an "uber-doc" on how the message relays can be utilized and a high-level flow.

In future, can stuff like HttpServerInventoryView be refactored to utilize these methods?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 even I found it confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a package-level javadoc about that to server/src/main/java/org/apache/druid/messages/package-info.java. Yeah, I think it could be used in the future by the inventory view. Here's the comment:

/**
 * Message relays provide a mechanism to send messages from server to client using long polling. The messages are
 * sent in order, with acknowledgements from client to server when a message has been successfully delivered.
 *
 * This is useful when there is some need for some "downstream" servers to send low-latency messages to some
 * "upstream" server, but where establishing connections from downstream servers to upstream servers would not be
 * desirable. This is typically done when upstream servers want to keep state in-memory that is updated incrementally
 * by downstream servers, and where there may be lots of instances of downstream servers.
 *
 * This structure has two main benefits. First, it prevents upstream servers from being overwhelmed by connections
 * from downstream servers. Second, it allows upstream servers to drive the updates of their own state, and better
 * handle events like restarts and leader changes.
 *
 * On the downstream (server) side, messages are placed into an {@link org.apache.druid.messages.server.Outbox}
 * and served by a {@link org.apache.druid.messages.server.MessageRelayResource}.
 *
 * On the upstream (client) side, messages are retrieved by {@link org.apache.druid.messages.client.MessageRelays}
 * using {@link org.apache.druid.messages.client.MessageRelayClient}.
 *
 * This is currently used by Dart (multi-stage-query engine running on Brokers and Historicals) to implement
 * worker-to-controller messages. In the future it may also be used to implement
 * {@link org.apache.druid.server.coordination.ChangeRequestHttpSyncer}.
 */

import org.apache.druid.messages.client.MessageRelay;

/**
* An outbox for messages sent from servers to clients.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Is outbox a batching mechanism, or also a way to order through the incoming messages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does do batching but it's also order-preserving. I added a comment about that.


public OutboxQueue()
{
this.epoch = ThreadLocalRandom.current().nextLong() & Long.MAX_VALUE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if its simpler to have an auto-incrementing counter here instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also needs to be different from a previous version from the same host after a JVM reboot, so it can't simply start at zero. I figured a random 63 bit int would be sufficient.

Copy link
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly minor comments. LGTM
🚀

/**
* Query has been canceled.
*/
CANCELED
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a logic state called finished ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Controllers are removed as soon as they finish, so a finished state isn't necessary.


private void workerFailed(final DruidNode node)
{
for (final ControllerHolder holder : controllerRegistry.getAllHolders()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be api's on the worker, to cancel all work. Lets say a broker switch happens and we donot have the controller for what ever reason, In that case, all dart work on the historical should stop no ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That happens in the DartWorkerRunner#BrokerListener, which is code on the worker side. It cancels all work associated with a given Broker when that Broker goes offline.

workerIds.add(WorkerId.fromDruidServerMetadata(server, queryId).toString());
}

// Shuffle workerIds, so we don't bias a single server to always be worker 0 (which tends to do more work).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be the leaf as well as non leaf. Like you would always have atleast one worker in each stage and that would be worker 0 so shuffling serverId's helps us not load one server I think.

/**
* Production implementation of {@link DartControllerRegistry}.
*/
public class DartControllerRegistryImpl implements DartControllerRegistry
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would this work in multiple brokers?
What would happen if the broker gets nuked.
Shoudn't it be backed by persistent store like

  • MYSQL

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO its ok for the query to fail if the broker restarts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes my thinking was the Dart queries are more ephemeral in nature.

/**
* Returns whether this controllerId replaces another one, i.e., if the host is the same and epoch is greater.
*/
public boolean replaces(final ControllerServerId otherId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not find references of this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe newer/latest is a better method name.

Copy link
Contributor Author

@gianm gianm Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it turns out this was left over from a previous iteration. I've removed the entire class. Things that used it just use the controller host by itself now, which is a String.

int nextRoundRobinWorker = 0;
for (final QueryableDataSegment segment : prunedSegments) {
final int worker;
if (segment.getWorkerNumber() == UNKNOWN) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we prune out such segments ?, until we hook up downloading segments on demand on historicals ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that at this point, it's better to fail the query if there is some race (or even bug?) that leads to an UNKNOWN worker. If we pruned the segment out then we might get partial results without realizing it.

import java.util.stream.Collectors;

@Path(DartSqlResource.PATH + '/')
public class DartSqlResource extends SqlResource
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shoudn't SQLStatmentResource be a better endpoint for this ?
The execEngine can be : "dart" ?

Copy link
Contributor Author

@gianm gianm Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted Dart queries to be API-compatible with the regular SQL API at /druid/v2/sql/. I added a comment about this:

/**
 * Resource for Dart queries. API-compatible with {@link SqlResource}, so clients can be pointed from
 * {@code /druid/v2/sql/} to {@code /druid/v2/sql/dart/} without code changes.
 */
@Path(DartSqlResource.PATH + '/')
public class DartSqlResource extends SqlResource

I do also think it would make sense to add an engine option to SqlStatementResource in the future.

*
* @return future that resolves to the next batch of messages
*/
ListenableFuture<MessageBatch<MessageType>> getMessages(String clientHost, long epoch, long startWatermark);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its unclear here that if the order is maintained.
This looks like something that the inventory view would use in the future no ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Order is maintained. I've added a comment about that. And yeah, it could be used in the future by the inventory view.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 even I found it confusing.


import java.util.Objects;

public class QueryableDataSegment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Could we rename this class to something Dart or MSQ specific since worker is a concept limit to those engines ?
We have a lot of similar DataSegment classes which causes confusion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to DartQueryableSegment.

@abhishekagarwal87
Copy link
Contributor

In future, we should look into merging message relays with HTTP sync framework that is used by HTTP based segment and task management.

Could you elaborate? Do you mean use the same base client object like ServiceClient?

No, I was referring to https://github.com/apache/druid/blob/master/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java
It does similar stuff where the client needs to get incremental updates from the server.

- Name controller threads properly.
- Close worker clients when workers go offline or when the controller exits.

Worker -> controller:

- Cancel futures when the controller goes offline.

Worker -> worker:

- Close workerClient when a worker is canceled.
@gianm gianm marked this pull request as ready for review October 1, 2024 08:57
@gianm
Copy link
Contributor Author

gianm commented Oct 1, 2024

Thank you for the reviews @abhishekagarwal87 @cryptoe @LakshSingla.

@gianm gianm merged commit 878adff into apache:master Oct 1, 2024
90 checks passed
@gianm gianm deleted the msq-dart branch October 1, 2024 21:38
kfaraz pushed a commit to kfaraz/druid that referenced this pull request Oct 4, 2024
This patch adds a profile of MSQ named "Dart" that runs on Brokers and
Historicals, and which is compatible with the standard SQL query API.
For more high-level description, and notes on future work, refer to apache#17139.

This patch contains the following changes, grouped into packages.

Controller (org.apache.druid.msq.dart.controller):

The controller runs on Brokers. Main classes are,

- DartSqlResource, which serves /druid/v2/sql/dart/.
- DartSqlEngine and DartQueryMaker, the entry points from SQL that actually
  run the MSQ controller code.
- DartControllerContext, which configures the MSQ controller.
- DartMessageRelays, which sets up relays (see "message relays" below) to read
  messages from workers' DartControllerClients.
- DartTableInputSpecSlicer, which assigns work based on a TimelineServerView.

Worker (org.apache.druid.msq.dart.worker)

The worker runs on Historicals. Main classes are,

- DartWorkerResource, which supplies the regular MSQ WorkerResource, plus
  Dart-specific APIs.
- DartWorkerRunner, which runs MSQ worker code.
- DartWorkerContext, which configures the MSQ worker.
- DartProcessingBuffersProvider, which provides processing buffers from
  sliced-up merge buffers.
- DartDataSegmentProvider, which provides segments from the Historical's
  local cache.

Message relays (org.apache.druid.messages):

To avoid the need for Historicals to contact Brokers during a query, which
would create opportunities for queries to get stuck, all connections are
opened from Broker to Historical. This is made possible by a message relay
system, where the relay server (worker) has an outbox of messages.

The relay client (controller) connects to the outbox and retrieves messages.
Code for this system lives in the "server" package to keep it separate from
the MSQ extension and make it easier to maintain. The worker-to-controller
ControllerClient is implemented using message relays.

Other changes:

- Controller: Added the method "hasWorker". Used by the ControllerMessageListener
  to notify the appropriate controllers when a worker fails.
- WorkerResource: No longer tries to respond more than once in the
  "httpGetChannelData" API. This comes up when a response due to resolved future
  is ready at about the same time as a timeout occurs.
- MSQTaskQueryMaker: Refactor to separate out some useful functions for reuse
  in DartQueryMaker.
- SqlEngine: Add "queryContext" to "resultTypeForSelect" and "resultTypeForInsert".
  This allows the DartSqlEngine to modify result format based on whether a "fullReport"
  context parameter is set.
- LimitedOutputStream: New utility class. Used when in "fullReport" mode.
- TimelineServerView: Add getDruidServerMetadata as a performance optimization.
- CliHistorical: Add SegmentWrangler, so it can query inline data, lookups, etc.
- ServiceLocation: Add "fromUri" method, relocating some code from ServiceClientImpl.
- FixedServiceLocator: New locator for a fixed set of service locations. Useful for
  URI locations.
kfaraz added a commit that referenced this pull request Oct 4, 2024
…#17244)

Backport for the following patches
* MSQ profile for Brokers and Historicals. (#17140)
* Remove workerId parameter from postWorkerError. (#17072)
---------
Co-authored-by: Gian Merlino <[email protected]>
gianm added a commit to gianm/druid that referenced this pull request Oct 11, 2024
The timeout handler should fire if the response has not been handled yet
(i.e. if responseResolved was previously false). However, it erroneously
fires only if the response *was* handled. This causes HTTP 500 errors if
the timeout actually does fire. The timeout is 30 seconds, which can be
hit during pipelined queries, if an earlier stage of the query hasn't
produced its first frame within 30 seconds.

This fixes a regression introduced in apache#17140.
abhishekagarwal87 pushed a commit that referenced this pull request Oct 11, 2024
The timeout handler should fire if the response has not been handled yet
(i.e. if responseResolved was previously false). However, it erroneously
fires only if the response *was* handled. This causes HTTP 500 errors if
the timeout actually does fire. The timeout is 30 seconds, which can be
hit during pipelined queries, if an earlier stage of the query hasn't
produced its first frame within 30 seconds.

This fixes a regression introduced in #17140.
abhishekagarwal87 pushed a commit to abhishekagarwal87/druid that referenced this pull request Oct 11, 2024
…he#17328)

The timeout handler should fire if the response has not been handled yet
(i.e. if responseResolved was previously false). However, it erroneously
fires only if the response *was* handled. This causes HTTP 500 errors if
the timeout actually does fire. The timeout is 30 seconds, which can be
hit during pipelined queries, if an earlier stage of the query hasn't
produced its first frame within 30 seconds.

This fixes a regression introduced in apache#17140.
kfaraz pushed a commit that referenced this pull request Oct 11, 2024
…) (#17330)

The timeout handler should fire if the response has not been handled yet
(i.e. if responseResolved was previously false). However, it erroneously
fires only if the response *was* handled. This causes HTTP 500 errors if
the timeout actually does fire. The timeout is 30 seconds, which can be
hit during pipelined queries, if an earlier stage of the query hasn't
produced its first frame within 30 seconds.

This fixes a regression introduced in #17140.

Co-authored-by: Gian Merlino <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 Area - Querying
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants